package day04;

import beans.SensorReading;
import day03.window.FlinkWindow00;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 监控温度传感器的温度值，如果温度值在10秒钟内 (processing time) 连续上升，则报警。
 *
 * @author lvbingbing
 * @date 2022-01-11 13:48
 */
public class ProcessFunction02 {
    public static void main(String[] args) throws Exception {
        // 1、创建 FlinkWindow00 对象，有参构造会初始化 env，并从socket文本流中读取数据
        int parallelism = 1;
        FlinkWindow00 flinkWindow = new FlinkWindow00(parallelism);
        // 2、获取可执行环境
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 3、温度持续上升告警
        temperatureKeepRisingWarning(flinkWindow.getSingleOutputStreamOperator());
        // 4、触发程序执行
        env.execute();
    }

    private static void temperatureKeepRisingWarning(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        sensorReadingStream.keyBy("id")
                .process(new TemperatureKeepRisingWarning(10))
                .print("温度持续上升...");
    }

    private static class TemperatureKeepRisingWarning extends KeyedProcessFunction<Tuple, SensorReading, String> {

        private static final long serialVersionUID = -6726791195921870872L;

        /**
         * 当前统计的时间间隔
         */
        private Integer interval;

        /**
         * 上一次温度值
         */
        private transient ValueState<Double> lastTemperatureState;

        /**
         * 定时器时间戳
         */
        private transient ValueState<Long> timerTimestampState;

        public TemperatureKeepRisingWarning(Integer interval) {
            this.interval = interval;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            lastTemperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("last-temperature", Double.class));
            timerTimestampState = getRuntimeContext().getState(new ValueStateDescriptor<>("timer-timestamp", Long.class));
        }

        @Override
        public void processElement(SensorReading sensorReading, KeyedProcessFunction<Tuple, SensorReading, String>.Context context, Collector<String> out) throws Exception {
            Double lastTemperature = lastTemperatureState.value();
            Long timerTimestamp = timerTimestampState.value();
            // 如果温度上升，且没有定时器，注册10s后的定时器，开始等待
            Double currentTemperature = sensorReading.getTemperature();
            if (lastTemperature != null) {
                if (currentTemperature > lastTemperature) {
                    if (timerTimestamp == null) {
                        // 获取 TimerService
                        TimerService timerService = context.timerService();
                        // 获取当前处理时间
                        long processingTime = timerService.currentProcessingTime();
                        timerTimestamp = processingTime + interval * 1000L;
                        // 注册处理时间定时器
                        timerService.registerProcessingTimeTimer(timerTimestamp);
                        // 更新定时器时间戳状态
                        timerTimestampState.update(timerTimestamp);
                    }
                } else {
                    // 删除定时器
                    TimerService timerService = context.timerService();
                    timerService.deleteProcessingTimeTimer(timerTimestamp);
                    // 清空定时器时间戳状态
                    timerTimestampState.clear();
                }
            } else {
                lastTemperature = 0d;
            }

            lastTemperatureState.update(lastTemperature);
        }

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Tuple, SensorReading, String>.OnTimerContext context, Collector<String> out) throws Exception {
            // 输出报警信息
            Tuple currentKey = context.getCurrentKey();
            String sensorId = currentKey.getField(0);
            out.collect("传感器 " + sensorId + " 温度值连续 " + interval + "s 上升");
            timerTimestampState.clear();
        }

        @Override
        public void close() throws Exception {
            lastTemperatureState.clear();
        }
    }
}
